package event

import (
	"context"
	"encoding/json"
	"github.com/streadway/amqp"
)

/*
   @author:zhongyang
   @date:2023/7/27
   @description:rabbitmq代码
*/

type RabbitMQServer struct {
	ch *amqp.Channel
}

// NewRabbitMQServer 这里不关链接
func NewRabbitMQServer() (*RabbitMQServer, error) {
	// 链接服务器
	conn, err := amqp.Dial("amqp://admin:admin@localhost:5672/")
	if err != nil {
		return nil, err
	}
	channel, err := conn.Channel()
	if err != nil {
		return nil, err
	}
	return &RabbitMQServer{
		ch: channel,
	}, nil
}

func (r *RabbitMQServer) Publish(ctx context.Context, msg *Message) (string, error) {
	// 消息内容
	body, _ := json.Marshal(msg.Body)
	// 推送消息
	err := r.ch.Publish(
		msg.Topic,      // exchange（交换机名字，跟前面声明对应）
		msg.RoutingKey, // 路由参数，fanout类型交换机，自动忽略路由参数，填了也没用。
		false,          // mandatory
		false,          // immediate
		amqp.Publishing{
			ContentType: "text/plain", // 消息内容类型，这里是普通文本
			Body:        body,         // 消息内容
		})
	if err != nil {
		return "", err
	}
	return "", nil
}

func (r *RabbitMQServer) PublishDelay(ctx context.Context, msg *DelayMessage) (string, error) {
	// 消息内容
	body, _ := json.Marshal(msg.Message.Body)
	// 推送消息
	publishing := amqp.Publishing{
		ContentType: "text/plain", // 消息内容类型，这里是普通文本
		Body:        body,         // 消息内容
		Expiration:  msg.Delay,
		Headers: map[string]interface{}{
			"x-delay": msg.Delay, // 消息从交换机过期时间,毫秒（x-dead-message插件提供）
		},
	}
	err := r.ch.Publish(
		msg.Topic,      // exchange（交换机名字，跟前面声明对应）
		msg.RoutingKey, // 路由参数，fanout类型交换机，自动忽略路由参数，填了也没用。
		false,          // mandatory
		false,          // immediate
		publishing)
	if err != nil {
		return "", err
	}
	return "", nil
}
